package penguin.transfer.netty.client;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import penguin.common.Constants;
import penguin.exception.PenguinException;
import penguin.transfer.TransferClient;
import penguin.transfer.TransferListener;
import penguin.transfer.dto.TransferDTO;
import penguin.transfer.netty.common.BlockAndGet;
import penguin.transfer.netty.common.NettyCodecAdapter;
import penguin.thread.NamedThreadFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

/**
 * Created on 15/7/3 上午10:07
 *
 * @author 王建华(penguin83@126.com)
 */
public class NettyClient extends SimpleChannelHandler  implements TransferClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyClient.class);

    private String host;
    private int port;

    // 因ChannelFactory的关闭有DirectMemory泄露，采用静态化规避
    // https://issues.jboss.org/browse/NETTY-424
    private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(
                                             Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
                                             Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
                                             Constants.DEFAULT_IO_THREADS);

    private ClientBootstrap bootstrap;
    private Channel channel;

    private ConcurrentHashMap<String,BlockAndGet> blockAndGetMap = new ConcurrentHashMap<String, BlockAndGet>();

    private boolean isRunning = false;

    private TransferListener transferListener;

    public NettyClient( ) {
        super();
    }

    public void init(String host, int port,TransferListener transferListener) {
        this.host = host;
        this.port = port;

        isRunning = false;
        this.transferListener = transferListener;
        final ChannelHandler idleStateHandler =new IdleStateHandler(new HashedWheelTimer(),Constants.HEART_TIME_OUT,Constants.HEART_TIME,0);

        bootstrap = new ClientBootstrap(channelFactory);
        // config
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", Constants.CONNECT_TIME_OUT);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter();
                ChannelPipeline pipeline = Channels.pipeline();

                pipeline.addLast("idle", idleStateHandler);
                pipeline.addLast("aware", new NettyClientIdleStateAwareChannelHandler());
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", NettyClient.this);
                return pipeline;
            }
        });
    }

    public void connect(){
        LOGGER.info("start connect " +host+":"+port);
        bootstrap.connect(new InetSocketAddress(host, port));

    }

    public void close(){

        isRunning = false;
        if(channel != null){
            try {
                channel.close();
            }catch(Exception e){}
        }
        channel = null;

    }

    public Object sendSync(Object object) throws Exception {

        if(!isRunning){
            LOGGER.error("[sendSync]:channel is not writable");
            throw new  Exception("[sendSync]:is not running");
        }

        if(channel == null){
            LOGGER.error("[sendSync]:channel is null");
            throw new  Exception("[sendSync]:channel is null");
        }

        if(!channel.isWritable()){
            LOGGER.error("[sendSync]:channel is not writable");
            throw new  Exception("[sendSync]:channel is not writable");
        }

        TransferDTO transferDTO = TransferDTO.buildMessage(object);
        channel.write(transferDTO);

        BlockAndGet blockAndGet = new BlockAndGet();

        blockAndGetMap.put(transferDTO.getUuid(), blockAndGet);

        try {
            blockAndGet.lock();
        } catch (InterruptedException e) {
            LOGGER.error("sendSync   interrupted", e.getCause());
            throw new  Exception("[sendSync]: interrupted");
        }

        if(blockAndGet.getException() != null){
            throw  new PenguinException(blockAndGet.getException());
        }

        Object result = blockAndGet.getObject();

        if(result == null){
            LOGGER.error("sendSync  time out");
            throw new  Exception("[sendSync]: time out");
        }

        return result;

    }

    @Override
    public boolean isConnected() {

        if(channel == null){
            return false;
        }
        return channel.isConnected();
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.channel = e.getChannel();
        this.isRunning = true;

        InetSocketAddress inetSocketAddress = (InetSocketAddress)this.channel.getRemoteAddress();
        LOGGER.debug("channelConnected["+inetSocketAddress.getAddress().getHostAddress()+":"+inetSocketAddress.getPort()+"]");

        transferListener.connected(inetSocketAddress.getAddress().getHostAddress(), inetSocketAddress.getPort());
    }

    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        InetSocketAddress inetSocketAddress = (InetSocketAddress)this.channel.getRemoteAddress();
        this.isRunning = false;

        LOGGER.debug("channelDisconnected["+ inetSocketAddress.getAddress().getHostAddress()+":"+inetSocketAddress.getPort()+"]");
        this.close();
        transferListener.disConnected(inetSocketAddress.getAddress().getHostAddress(), inetSocketAddress.getPort());

    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        TransferDTO dto = (TransferDTO)e.getMessage();
        //心跳响应 则不处理
        if("heart".equals(dto.getType())){
            return;
        }


        BlockAndGet b = blockAndGetMap.get(dto.getUuid());
        if(dto.isHasException()){
            b.unlockAndException((String)dto.getObject());
        }

        b.setAndUnlock(dto.getObject());
    }

    @Override
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        super.writeRequested(ctx, e);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        LOGGER.error("exceptionCaught[" + host + ":" + port + "]", e.getCause());

        Channel channel = e.getChannel();

        if(channel != null && channel.isConnected()){
            channel.close();
        }
        else {
            transferListener.exceptionCaught(ctx, e);
        }
    }
}
